-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add AcknowledgementSet support to DocumentDB/MongoDB streams #4379
Conversation
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
} | ||
} | ||
parentThread.interrupt(); | ||
executorService.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should L88-L89 be in init method instead of the callable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, once this monitor exists because of ack wait timeout, it should stop the stream worker.
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've only taken a cursory look and haven't looked at the logic yet.
this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout; | ||
this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs; | ||
this.checkPointIntervalInMs = checkPointIntervalInMs; | ||
executorService = Executors.newSingleThreadExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the BackgroundThreadFactory
for all ExecutorServices
going forward.
Here is an example of using it:
Line 48 in 19b18a1
executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-sqs")); |
This thread factory will give us more useful names and also make the thread a daemon thread to ensure Data Prepper shuts down properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
executor.submit(exportScheduler); | ||
executor.submit(exportWorker); | ||
executor.submit(streamScheduler); | ||
executor = Executors.newFixedThreadPool(runnableList.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, use the BackgroundThreadFactory
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (recordCount % defaultFlushBatchSize == 0) { | ||
LOG.debug("Write to buffer for line " + (recordCount - defaultFlushBatchSize) + " to " + recordCount); | ||
if (recordCount % recordFlushBatchSize == 0) { | ||
LOG.debug("Write to buffer for line " + (recordCount - recordFlushBatchSize) + " to " + recordCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use string interpolation:
LOG.debug("Write to buffer for line {} to {}", (recordCount - recordFlushBatchSize), recordCount);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
@@ -0,0 +1,45 @@ | |||
package org.opensearch.dataprepper.plugins.mongo.model; | |||
|
|||
public class CheckpointStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is only used by StreamAcknowledgementManager
. So you can move this into the stream
package and make it package-private to help keep it encapsulated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. will do.
} | ||
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken()); | ||
partitionCheckpoint.giveUpPartition(); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this. This will end the thread and then also the stream thread. But, would these ever start again? It seems that when this happens, this node is now done working and unable to contribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will end the stream. New node or same node can acquire the partition and work from the checkpoint state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I see. The StreamScheduler
is still running and can re-acquire.
Description
Add AcknowledgementSet support to DocumentDB/MongoDB streams
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.